package com.nx.platform.es.biz.esspider.sink;

import com.google.common.base.Preconditions;
import com.nx.platform.es.biz.esspider.entity.Item;
import com.nx.platform.es.common.utils.*;
import com.nx.platform.es.service.ESClientManager;
import com.nx.platform.es.system.config.ConfigCenter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.jetbrains.annotations.NotNull;

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
 * @author
 * @date 2018/01/25
 */
@Slf4j
public class SinksImpl implements Sinks {

    private final String configKey;
    private final ESClientManager esManager;
    private final String[] targets;

    public SinksImpl(String configKey, ESClientManager esManager) {
        this.configKey = configKey;
        this.esManager = esManager;
        this.targets = new String[0];
    }

    public SinksImpl(String configKey, ESClientManager esManager, String targets) {
        this.configKey = configKey;
        this.esManager = esManager;
        this.targets = MoreFunctions.toStringArray(targets, MoreSplitters.COMMA);
    }

    @Override
    public void check(String code) throws RuntimeException {
        List<Sink> sinks = sinkMap(code);
        if (sinks == null || sinks.isEmpty()) {
            throw new IllegalStateException("sink empty, code=" + code);
        }
    }

    @SuppressWarnings("unchecked")
    private List<Sink> sinkMap(String code) {
        return (List<Sink>) ConfigCenter.getConfig(configKey + "." + code, settings -> {
            List<Map<?, ?>> sinks = YamlParser.parseToList(settings);
            Preconditions.checkState(!sinks.isEmpty(), "sinks empty, code=" + code);
            return sinks.stream()
                    .filter(sink -> targets.length == 0
                            || ArrayUtils.contains(targets, MoreMaps.getString(sink, Constants.SINKS_ID)))
                    .map(config -> {
                        try {
                            return SinkFactory.createSink(esManager, config);
                        } catch (Exception e) {
                            log.error(e.getMessage());
                            throw new RuntimeException(e);
                        }
                    }).collect(Collectors.toList());
        }).orElse(Collections.emptyList());
    }

    @Override
    public @NotNull Map<Long, Item> accept(String code, Map<Long, Item> items) {
        List<Sink> sinks = sinkMap(code);
        if (CollectionUtils.isEmpty(sinks)) {
            throw new IllegalStateException("sinks empty or not found, code=" + code);
        }
        Map<Long, Item> failedItems = new LinkedHashMap<>();
        sinks.forEach(sink -> failedItems.putAll(sink.accept(items)));
        return failedItems;
    }

}
